7.04. Справочник по Logstash
Справочник по Logstash
Logstash — это серверная программа с открытым исходным кодом для сбора, обработки и пересылки событий и журналов. Она принимает данные из множества источников, преобразует их в соответствии с заданными правилами и отправляет в различные системы назначения, такие как Elasticsearch, файлы, базы данных или очереди сообщений.
Архитектура Logstash
Logstash работает по конвейерной модели, состоящей из трёх основных компонентов:
- Input — получает данные из внешних источников.
- Filter — преобразует и обогащает данные.
- Output — отправляет обработанные данные в целевые системы.
Каждый компонент реализуется через плагины. Logstash поставляется с сотнями встроенных плагинов и поддерживает пользовательские расширения.
Input-плагины
Input-плагины определяют источники данных. Ниже приведены наиболее часто используемые.
file
Читает события из файлов, аналогично команде tail -F.
Основные параметры:
path— путь к файлу или шаблон (например,/var/log/*.log).start_position— позиция начала чтения (beginningилиend).sincedb_path— путь к файлу, хранящему смещение последнего прочитанного байта.codec— кодек для разбора содержимого (по умолчаниюplain).type— добавляет полеtypeк событию.tags— массив тегов, добавляемых к событию.
Пример:
input {
file {
path => "/var/log/app.log"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "application_log"
}
}
beats
Получает данные от Filebeat, Metricbeat и других Beats.
Основные параметры:
port— порт прослушивания (например,5044).host— адрес интерфейса для привязки ("0.0.0.0"для всех).ssl— включает TLS/SSL.ssl_certificate,ssl_key— пути к сертификату и ключу при использовании SSL.
Пример:
input {
beats {
port => 5044
}
}
http
Принимает события через HTTP(S) POST-запросы.
Основные параметры:
port— порт HTTP-сервера.host— хост для привязки.ssl— включает HTTPS.response_headers— заголовки ответа клиенту.codec— кодек для разбора тела запроса (json,plain,lineи др.).
Пример:
input {
http {
port => 8080
codec => json
}
}
stdin
Читает события из стандартного ввода. Полезен для тестирования.
Параметры:
codec— кодек разбора (line,json_linesи др.).
Пример:
input {
stdin {
codec => json_lines
}
}
tcp / udp
Принимает события по протоколам TCP или UDP.
Общие параметры:
port— порт прослушивания.host— хост привязки.codec— кодек (line,json,netflowи др.).type— тип события.- Для TCP:
ssl_enable,ssl_cert,ssl_key.
Пример TCP:
input {
tcp {
port => 9999
codec => json
}
}
Filter-плагины
Filter-плагины применяются для преобразования событий. Порядок фильтров в конфигурации важен.
grok
Разбирает неструктурированный текст с помощью регулярных выражений и шаблонов.
Основные параметры:
match— сопоставление поля с шаблоном (например,"message" => "%{COMBINEDAPACHELOG}").patterns_dir— каталог с пользовательскими шаблонами.break_on_match— прекращать ли обработку после первого совпадения.tag_on_failure— тег, добавляемый при неудаче (по умолчанию_grokparsefailure).
Встроенные шаблоны: IP, TIMESTAMP_ISO8601, NUMBER, WORD, DATA, GREEDYDATA и сотни других.
Пример:
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
}
}
date
Преобразует строковое представление времени в поле @timestamp.
Основные параметры:
match— массив из имени поля и формата даты (например,["log_timestamp", "yyyy-MM-dd HH:mm:ss"]).timezone— часовой пояс (например,"Europe/Moscow").target— целевое поле (по умолчанию@timestamp).
Пример:
filter {
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
timezone => "UTC"
}
}
mutate
Выполняет базовые операции над полями: переименование, удаление, преобразование типов, объединение и разделение.
Основные действия:
rename— переименовывает поле.remove_field— удаляет поле.convert— преобразует тип (string,integer,float,boolean).add_field— добавляет новое поле.split— разделяет строку по разделителю.join— объединяет массив в строку.strip— удаляет пробелы по краям.gsub— замена подстрок по регулярному выражению.
Пример:
filter {
mutate {
convert => { "bytes" => "integer" }
rename => { "host_ip" => "client_ip" }
remove_field => ["temp_field"]
}
}
json
Разбирает JSON-строку из указанного поля и вставляет результат как подобъект.
Основные параметры:
source— имя поля с JSON-строкой.target— имя поля, куда поместить результат (по умолчанию корень события).skip_on_invalid_json— пропускать ли событие при ошибке разбора.
Пример:
filter {
json {
source => "message"
target => "parsed"
}
}
kv (key-value)
Извлекает пары ключ=значение из строки (часто используется для логов вида user=admin action=login).
Основные параметры:
source— поле для разбора.field_split— символ разделения полей (по умолчанию&).value_split— символ разделения ключа и значения (по умолчанию=).target— целевое поле.include_keys,exclude_keys— фильтрация ключей.
Пример:
filter {
kv {
source => "message"
field_split => " "
value_split => "="
}
}
geoip
Добавляет географическую информацию на основе IP-адреса.
Основные параметры:
source— поле с IP-адресом.target— поле для сохранения результата (по умолчаниюgeoip).database— путь к MaxMind GeoLite2 базе (.mmdb).fields— список полей для извлечения (city_name,country_code2,locationи др.).
Пример:
filter {
geoip {
source => "client_ip"
database => "/usr/share/GeoIP/GeoLite2-City.mmdb"
fields => ["city_name", "country_code2", "location"]
}
}
useragent
Разбирает строку User-Agent и извлекает информацию о браузере, ОС и устройстве.
Основные параметры:
source— поле со строкой User-Agent.target— целевое поле (по умолчаниюuser_agent).regexes— пользовательские регулярные выражения.
Пример:
filter {
useragent {
source => "agent"
target => "browser"
}
}
drop
Отбрасывает событие целиком. Используется в условных блоках.
Параметры:
percentage— процент событий для отбрасывания (для выборочной фильтрации).
Пример:
filter {
if [level] == "debug" {
drop {}
}
}
clone
Создаёт копию события. Полезно для отправки одного события в несколько выходов с разными преобразованиями.
Параметры:
clones— массив имён клонов.add_tag— теги для добавления к клонам.
Пример:
filter {
clone {
clones => ["archive_copy"]
add_tag => ["archived"]
}
}
Output-плагины
Output-плагины отправляют обработанные события в целевые системы.
elasticsearch
Отправляет события в Elasticsearch.
Основные параметры:
hosts— массив хостов Elasticsearch (["http://localhost:9200"]).index— шаблон имени индекса (например,"logs-%{+YYYY.MM.dd}").document_id— уникальный идентификатор документа.user,password— учётные данные при включённой аутентификации.ssl_certificate_verification— проверка SSL-сертификата.ilm_enabled— использование Lifecycle Management.pipeline— имя ingest pipeline в Elasticsearch.
Пример:
output {
elasticsearch {
hosts => ["http://es-cluster:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "changeme"
}
}
file
Записывает события в файл.
Основные параметры:
path— путь к файлу.codec— кодек сериализации (json_lines,plain,rubydebug).write_behavior— поведение записи (overwrite,append).flush_interval— интервал сброса буфера (в секундах).
Пример:
output {
file {
path => "/var/log/processed/%{type}.log"
codec => json_lines
}
}
stdout
Выводит события в стандартный поток вывода. Используется для отладки.
Параметры:
codec— кодек (rubydebug,json,dots).
Пример:
output {
stdout {
codec => rubydebug
}
}
http
Отправляет события через HTTP(S) POST или PUT.
Основные параметры:
url— URL конечной точки.http_method— метод (post,put).format— формат тела (json,form).headers— заголовки запроса.user,password— базовая аутентификация.ssl_certificate_validation— проверка SSL.
Пример:
output {
http {
url => "https://api.example.com/events"
http_method => "post"
format => "json"
headers => { "Authorization" => "Bearer xyz" }
}
}
kafka
Отправляет события в Apache Kafka.
Основные параметры:
bootstrap_servers— список брокеров ("host1:9092,host2:9092").topic_id— тема Kafka.key— ключ сообщения (для партиционирования).compression_type— тип сжатия (none,gzip,snappy,lz4).ssl— настройки SSL/TLS.sasl— настройки SASL аутентификации.
Пример:
output {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "logstash-events"
compression_type => "snappy"
}
}
null
Отбрасывает события без вывода. Используется для тестирования производительности.
Пример:
output {
null {}
}
Кодеки (Codecs)
Кодеки определяют, как данные читаются из источника или записываются в назначение. Они работают внутри input- и output-плагинов и отвечают за сериализацию и десериализацию событий.
plain
Простейший кодек: каждая строка рассматривается как отдельное событие. Используется по умолчанию во многих input-плагинах.
Параметры:
charset— кодировка (например,"UTF-8").
json
Разбирает строку как JSON-объект и создаёт событие на его основе.
Параметры:
charset— кодировка.target— поле, куда поместить результат (по умолчанию корень события).
json_lines
Обрабатывает поток JSON-объектов, каждый из которых занимает одну строку (newline-delimited JSON).
Параметры:
charset— кодировка.
multiline
Объединяет несколько строк в одно событие по заданному шаблону. Часто используется для логов с многострочными стеками ошибок.
Основные параметры:
pattern— регулярное выражение, определяющее начало нового события.negate— инвертировать условие (trueозначает, что строки, не соответствующие шаблону, присоединяются к предыдущей).what— указывает, к какой части относится совпадение (previousилиnext).max_lines— максимальное количество строк в одном событии.max_bytes— максимальный размер буфера в байтах.
Пример для Java-стеков:
codec => multiline {
pattern => "^\s"
what => "previous"
negate => true
}
rubydebug
Форматирует событие в человекочитаемый вид с цветами и отступами (только для вывода). Аналог stdout { codec => rubydebug }.
line
Читает входной поток построчно. Похож на plain, но позволяет явно указать разделитель строк.
Параметры:
delimiter— символ окончания строки (по умолчанию\n).
Условные конструкции
Logstash поддерживает условные выражения (if, else if, else) для управления потоком обработки.
Синтаксис:
if [field] == "value" {
mutate { ... }
} else if [status] =~ /^5\d\d$/ {
drop {}
} else {
# действие по умолчанию
}
Операторы сравнения:
==,!=,<,>,<=,>==~— совпадение с регулярным выражением!~— отсутствие совпаденияin— проверка вхождения (для массивов и строк)not in— отрицание вхождения
Логические операторы:
and,or,not
Примеры:
if "error" in [message] {
mutate { add_tag => ["error_log"] }
}
if [log_level] == "DEBUG" and [environment] == "production" {
drop {}
}
Поля события
Каждое событие в Logstash — это документ с полями. Поля могут быть скалярами (строка, число, булево), массивами или вложенными объектами.
Доступ к полям:
[field]— простое поле[nested][field]— вложенное поле[array][0]— первый элемент массива
Специальные поля:
@timestamp— временная метка события (автоматически добавляется при создании события)@version— версия формата события (обычно"1")host— имя хоста, с которого пришло событие (если источник предоставляет)message— исходное содержимое события
Настройки производительности и масштабирования
Pipeline Workers
Logstash использует несколько рабочих потоков для параллельной обработки событий.
Параметр:
-w Nилиpipeline.workers: Nвlogstash.yml— количество рабочих потоков (по умолчанию равно числу ядер CPU).
Рекомендация: не увеличивать сверх числа ядер без профилирования — может вызвать конкуренцию за ресурсы.
Batch Size
Количество событий, обрабатываемых одним рабочим за раз.
Параметр:
pipeline.batch.size(по умолчанию125)
Увеличение размера пакета повышает пропускную способность, но увеличивает задержку.
Batch Delay
Максимальное время ожидания заполнения пакета перед обработкой.
Параметр:
pipeline.batch.delay(в миллисекундах, по умолчанию50)
Queue
Logstash поддерживает две очереди:
- In-memory queue — по умолчанию, быстрая, но теряет данные при аварийном завершении.
- Persistent queue — на диске, обеспечивает отказоустойчивость.
Настройки persistent queue (logstash.yml):
queue.type: persisted
path.queue: "/var/lib/logstash/queue"
queue.page_capacity: 250mb
queue.max_events: 0
queue.max_bytes: 10gb
Мониторинг Logstash
Logstash предоставляет метрики через HTTP API и может интегрироваться с мониторинговыми системами.
Встроенный HTTP API
Включается в logstash.yml:
http.host: "127.0.0.1"
http.port: 9600
Эндпоинты:
GET /— общая информацияGET /_node/pipeline— состояние pipelineGET /_node/stats— статистика по входам, фильтрам, выходам
Метрики
Включают:
- количество обработанных событий
- скорость обработки (events/sec)
- задержки
- ошибки
Можно отправлять метрики в Elasticsearch, Prometheus (через плагин prometheus_exporter) или StatsD.
CLI-параметры запуска
Основные флаги командной строки:
-f CONFIG_FILE— путь к файлу конфигурации-e CONFIG_STRING— конфигурация в виде строки-l LOG_DIR— каталог для логов-w WORKERS— количество рабочих потоков--config.reload.automatic— автоматическая перезагрузка при изменении конфигурации--config.reload.interval=3s— интервал проверки изменений--quiet,--verbose,--debug— уровни логирования
Пример запуска:
bin/logstash -f /etc/logstash/conf.d/app.conf --config.reload.automatic
Примеры полных конфигураций
1. Обработка Apache-логов
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "apache_access"
}
}
filter {
if [type] == "apache_access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "user_agent"
}
mutate {
remove_field => ["message", "timestamp"]
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "apache-logs-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
2. Приём JSON через HTTP и отправка в Kafka
input {
http {
port => 8080
codec => json
}
}
filter {
mutate {
add_field => { "received_at" => "%{@timestamp}" }
}
}
output {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "incoming-events"
compression_type => "snappy"
}
}
3. Фильтрация и клонирование для архива
input {
beats {
port => 5044
}
}
filter {
if [log][level] == "error" {
clone {
clones => ["archive"]
add_tag => ["archived_error"]
}
}
}
output {
if "archived_error" in [tags] {
file {
path => "/var/log/errors-archive/%{+YYYY-MM-dd}.log"
codec => json_lines
}
}
elasticsearch {
hosts => ["http://es:9200"]
index => "logs-%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}
Расширенные фильтры и специализированные плагины
csv
Разбирает строки в формате CSV (значения, разделённые запятыми или другим символом).
Основные параметры:
separator— символ-разделитель (по умолчанию,)columns— массив имён колонок, которые будут использоваться как ключи полейsource— поле, содержащее строку CSVtarget— поле, в которое помещаются результаты (по умолчанию корень события)quote_char— символ кавычек для экранирования значений (по умолчанию")
Пример:
filter {
csv {
separator => ";"
columns => ["timestamp", "user_id", "action", "status"]
source => "message"
}
}
urldecode
Декодирует URL-кодированные строки (например, %D0%9F%D1%80%D0%B8%D0%B2%D0%B5%D1%82 → Привет).
Параметры:
field— имя поля для декодированияtarget— целевое поле (если не указано, заменяет исходное)
Пример:
filter {
urldecode {
field => "query_string"
}
}
split
Разделяет одно событие на несколько по указанному полю (обычно массиву).
Параметры:
field— поле-массив, по которому выполняется разделениеterminator— если поле — строка, используется как разделитель
Пример:
# Исходное событие: { "tags": ["a", "b", "c"] }
filter {
split { field => "tags" }
}
# Результат: три события с tags = "a", "b", "c"
fingerprint
Создаёт уникальный хеш на основе указанных полей. Используется для дедупликации или анонимизации.
Параметры:
source— поле или массив полей для хешированияtarget— имя нового поля для хешаmethod— алгоритм (MD5,SHA1,SHA256,MURMUR3)key— секретный ключ для HMAC (опционально)
Пример:
filter {
fingerprint {
source => ["client_ip", "user_agent"]
target => "session_id"
method => "MURMUR3"
}
}
aggregate
Группирует события по ключу и накапливает данные из нескольких событий в одно. Полезен для сбора связанных записей (например, начало и завершение транзакции).
Параметры:
task_id— выражение для формирования ключа группировкиcode— Ruby-код для агрегацииpush_previous_event— отправлять ли предыдущее событие при новом ключеtimeout— время ожидания завершения группы
Пример (накопление всех строк лога под одним ID):
filter {
aggregate {
task_id => "%{transaction_id}"
code => "
map['lines'] ||= []
map['lines'] << event.get('message')
event.cancel()
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "transaction_id"
timeout => 30
}
}
Безопасность и управление доступом
TLS/SSL в input и output
Logstash поддерживает шифрование трафика через TLS во многих плагинах.
Общие параметры для SSL:
ssl_enable— включить SSLssl_verify— проверять сертификат клиента/сервераssl_cert— путь к сертификатуssl_key— путь к приватному ключуssl_key_passphrase— пароль к ключуssl_ca_cert— путь к CA-сертификату
Пример безопасного HTTP-входа:
input {
http {
port => 8443
ssl => true
ssl_certificate => "/etc/logstash/certs/server.crt"
ssl_key => "/etc/logstash/certs/server.key"
}
}
Аутентификация
- В
elasticsearchoutput: параметрыuserиpassword - В
httpoutput/input: заголовкиAuthorization, базовая аутентификация - В Kafka: SASL/SCRAM, Kerberos (через
sasl_jaas_config,sasl_mechanism)
Управление конфигурациями
Организация файлов
Рекомендуется разделять конфигурацию на части:
01-inputs.conf10-filters-common.conf20-filters-app.conf99-outputs.conf
Logstash загружает все .conf файлы из указанной директории в лексикографическом порядке.
Переменные окружения
Можно использовать переменные окружения в конфигурации:
input {
elasticsearch {
hosts => "${ES_HOSTS:localhost:9200}"
user => "${ES_USER:elastic}"
password => "${ES_PASSWORD}"
}
}
Если переменная не задана, используется значение после двоеточия (по умолчанию).
Автоматическая перезагрузка
Включается флагом --config.reload.automatic. Logstash отслеживает изменения в файлах и перезагружает pipeline без остановки.
Важно: ошибки в новой конфигурации не приводят к остановке — старая версия продолжает работать.
Обработка ошибок и отладка
Теги ошибок
Некоторые плагины автоматически добавляют теги при ошибках:
_grokparsefailure_jsonparsefailure_dateparsefailure
Можно использовать их для маршрутизации проблемных событий:
output {
if "_grokparsefailure" in [tags] {
file { path => "/var/log/logstash/grok_failures.log" }
} else {
elasticsearch { ... }
}
}
Логирование
Уровни логов: ERROR, WARN, INFO, DEBUG.
Файлы логов по умолчанию: logs/logstash-plain.log, logs/logstash-json.log.
Можно настроить в config/log4j2.properties.
Dry-run и проверка синтаксиса
Команда для проверки конфигурации без запуска:
bin/logstash --config.test_and_exit -f my.conf
Интеграция с Beats
Beats (Filebeat, Metricbeat и др.) являются лёгкими агентами для сбора данных и отправки в Logstash.
Рекомендуемая схема:
- Filebeat читает логи → отправляет в Logstash через протокол Lumberjack
- Logstash обрабатывает → отправляет в Elasticsearch
Преимущества:
- Filebeat гарантирует доставку (подтверждение получения)
- Минимальное потребление ресурсов на хосте-источнике
- Поддержка модулей (например,
filebeat modules enable nginx)
Конфигурация Filebeat для Logstash:
output.logstash:
hosts: ["logstash-host:5044"]
Пользовательские плагины
Logstash позволяет создавать собственные плагины на Ruby.
Типы плагинов:
- Input
- Filter
- Output
- Codec
Структура плагина:
- Наследуется от базового класса (
LogStash::Inputs::Baseи др.) - Реализует методы
register,run,close - Упаковывается как gem
Установка:
bin/logstash-plugin install /path/to/my-plugin.gem